package com.alisonyu.spider;

import com.alisonyu.spider.Downloader.Download;
import com.alisonyu.spider.Downloader.VertxDownloader;
import com.alisonyu.spider.Proccessor.DefaultProcessor;
import com.alisonyu.spider.Proccessor.Process;
import com.alisonyu.spider.Router.Router;
import com.alisonyu.spider.Scheduler.ClusterClientScheduler;
import com.alisonyu.spider.Scheduler.MemoryScheduler;
import com.alisonyu.spider.Scheduler.RedisScheduler;
import com.alisonyu.spider.Scheduler.Schedule;
import com.hazelcast.config.Config;
import com.hazelcast.config.JoinConfig;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.spi.cluster.hazelcast.HazelcastClusterManager;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class SpiderFactory {
    private SpiderConfig config;
    private volatile Vertx vertx;
    private Router router;
    //设置下载器，默认使用VertxDownloader
    private Download downloader;
    //设置处理器,默认使用DefaultProcess
    private Process processor;
    //设置调度器，单机模式默认使用MemoryScheduler
    private Schedule scheduler;

    public SpiderFactory(){
        this(new SpiderConfig());
    }

    public SpiderFactory(SpiderConfig config){
        this.config = config;
    }

    public Vertx getVertx(){
        if (vertx!=null) return vertx;
        synchronized (SpiderFactory.class){
            if (vertx==null){
                if (config.isCluster()){
                    if (config.isClient()){
                        vertx = getClusterClientVertx();
                    }
                    else{
                        vertx = getClusterServerVertx();
                    }
                }else{
                    vertx = getStandAloneVertx();
                }
            }
        }
        return vertx;
    }

    private Vertx getStandAloneVertx(){
        return Vertx.vertx();
    }

    private Vertx getClusterServerVertx(){
        Config hazelcastConfig = new Config(); //创建hazelcast配置
        hazelcastConfig.getNetworkConfig().setPort(config.getClusterPort()).setPublicAddress(config.getClusterAddress());
        //设置使用TCP/IP发现集群
        JoinConfig joinConfig = hazelcastConfig.getNetworkConfig().getJoin();
        joinConfig.getTcpIpConfig().setEnabled(true);
        joinConfig.getMulticastConfig().setEnabled(false);
        //初始化Vertx实例
        ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
        VertxOptions options = new VertxOptions().setClusterManager(mgr);
        CompletableFuture<Vertx> future = new CompletableFuture<>();
        Vertx.clusteredVertx(options,res->{
            if (res.succeeded()){
                Vertx vertx = res.result();
                future.complete(vertx);
            }else{
                future.complete(null);
            }
        });
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }

    private Vertx getClusterClientVertx(){
        //创建hazelcast配置
        Config hazelcastConfig = new Config();
        JoinConfig joinConfig = hazelcastConfig.getNetworkConfig().getJoin();
        //连接集群中的一员（中心）
        joinConfig.getTcpIpConfig().addMember(config.getClusterAddress()+":"+config.getClusterPort()).setEnabled(true);
        joinConfig.getMulticastConfig().setEnabled(false);
        joinConfig.getAwsConfig().setEnabled(false);
        ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
        VertxOptions options = new VertxOptions().setClusterManager(mgr);
        CompletableFuture<Vertx> future = new CompletableFuture<>();
        Vertx.clusteredVertx(options,res->{
            if (res.succeeded()){
                Vertx vertx = res.result();
                future.complete(vertx);
            }else{
                future.complete(null);
            }
        });
        try {
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }

    public EventBus getEventBus(){
        return getVertx().eventBus();
    }

    public Router getRouter(){
        if (router!=null) return router;
        if (config.getRouter()!=null){
            this.router = config.getRouter();
        }else{
            this.router = new Router();
        }
        return this.router;
    }

    public Download getDownloader(){
        if (downloader!=null) return downloader;
        if (config.getDownloader()==null || config.getDownloader() instanceof VertxDownloader){
            this.downloader = new VertxDownloader(getVertx());
        }
        else if(config.getDownloader()!=null){
            this.downloader = config.getDownloader();
        }
        this.downloader.setMaxRetryTime(config.getMaxRetryTime());
        return this.downloader;
    }

    public Process getProcessor(){
        if (processor!=null) return processor;
        if (config.getProcessor()==null || config.getProcessor() instanceof DefaultProcessor){
            this.processor = new DefaultProcessor(getRouter());
        }else if (config.getProcessor()!=null){
            this.processor = config.getProcessor();
        }
        return this.processor;
    }

    public Schedule getScheduler() {
        if(scheduler!=null) return scheduler;
        if (config.getScheduler()==null){
            if(!config.isCluster()){
                this.scheduler = new MemoryScheduler();
            }else{
                this.scheduler = new ClusterClientScheduler(getEventBus());
            }
        }else{
            if (config.getScheduler() instanceof RedisScheduler){
                this.scheduler = getRedisScheduler();
            }else{
                this.scheduler = config.getScheduler();
            }
        }
        return this.scheduler;
    }

    private JedisPool getJedisPool(){
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        JedisPool pool = new JedisPool(poolConfig,config.getRedisHost(),config.getRedisPort(),2000);
        return pool;
    }

    private RedisScheduler getRedisScheduler(){
        return new RedisScheduler(getJedisPool(),config);
    }

}
